
/*
    累加器支持两种类型的参数，第一种是用于表达需求的配置参数RealConfValue，
                             第二种是用于累加器基于数据灵活运算的数据参数PhysicalPlanExpr
         基于以上，累加器配置格式暂定为 accumulator_name(RealConfValue,...,RealConfValue;LogicalPlanExprFunc,...LogicalPlanExprFunc)
                 累加器的两种参数均可以为空。
*/

use crate::data_frame::value::{RealConfValue, ConfValue};
use crate::logical_plan::expr::func::{LogicalPlanExprFuncOrValue, parse_logical_expr_real_value, parse_logical_expr_func_or_value_single};
use sapeur::{Parser, parse_text, ParserResult};
use sapeur::text::{TextChar, text_ident, text_just, text_key_world};
use sapeur::logical_comb::LogicalCombOutput;
use crate::conf_init::{MapResult, MapErr};

#[derive(Debug)]
pub struct LogicalPlanExprAccumulator(pub (crate) String, pub (crate) Vec<RealConfValue>, pub (crate) Vec<LogicalPlanExprFuncOrValue>);

#[derive(Debug)]
enum LogicalPlanExprAccumulatorAssist{
    RealConfValue(RealConfValue),
    LogicalPlanExprFuncOrValue(LogicalPlanExprFuncOrValue),
}

fn real_or_conf_value<'a>()
    ->impl Parser<char,LogicalPlanExprAccumulatorAssist,TextChar>+'a{

    let parse_real_value = parse_logical_expr_real_value()
        .map_end(|o|
            LogicalPlanExprAccumulatorAssist::RealConfValue(RealConfValue::RealValue(o))
        );

    let parse_conf_value = text_key_world()
        .map_end(|o|
            LogicalPlanExprAccumulatorAssist::RealConfValue(RealConfValue::ConfValue(ConfValue::new(o)))
        );

    parse_real_value.or(parse_conf_value)
}

pub fn parse_logical_expr_accumulator_single<'a>()
    ->impl Parser<char, LogicalPlanExprAccumulator, TextChar>+'a {

        let func_no_arg = text_ident()
            .then_ignore(text_just("(").expand_by_repeated(text_just(" ")))
            .then_ignore(text_just(")").expand_by_repeated(text_just(" ")))
            .map_end(|n| LogicalPlanExprAccumulator(n, Vec::new(), Vec::new()));

        let func_only_conf = text_ident()
            .and(
                real_or_conf_value()
                    .separated_by(text_just(",").expand_by_repeated(text_just(" ")), false)
                    .delimited_by(text_just("(").expand_by_repeated(text_just(" ")),
                        text_just(")").expand_by_repeated(text_just(" ")))
            )
            .map_end_option(|(n,o)| {
                let mut tmp = Vec::new();
                for t in o.into_iter() {
                    match t {
                        LogicalPlanExprAccumulatorAssist::RealConfValue(t) => {
                            tmp.push(t);
                        },
                        LogicalPlanExprAccumulatorAssist::LogicalPlanExprFuncOrValue(_) => {
                            return None;
                        },
                    }
                }
                return Some(LogicalPlanExprAccumulator(n, tmp, Vec::new()));
            });

        let func_only_data = text_ident()
            .and(
                parse_logical_expr_func_or_value_single()
                    .separated_by(text_just(",").expand_by_repeated(text_just(" ")), false)
                    .delimited_by(text_just("(").expand_by_repeated(text_just(" ")),
                                  text_just(")").expand_by_repeated(text_just(" ")))
            )
            .map_end(|(n,o)|
                LogicalPlanExprAccumulator(n, Vec::new(), o)
            );

        let conf_or_func = real_or_conf_value().or(
            parse_logical_expr_func_or_value_single()
                .map_end(|o| LogicalPlanExprAccumulatorAssist::LogicalPlanExprFuncOrValue(o))
        );

        let func_mixed = text_ident()
            .and(
                conf_or_func
                    .separated_by(text_just(",").expand_by_repeated(text_just(" ")), false)
                    .separated_by(text_just(";").expand_by_repeated(text_just(" ")), false)
                    .delimited_by(text_just("(").expand_by_repeated(text_just(" ")),
                                  text_just(")").expand_by_repeated(text_just(" ")))
            )
            .map_end_option(|(n,o)| {
                if o.len() == 2{
                    let left = &o[0];
                    let right = &o[1];
                    let mut conf: Vec<RealConfValue> = Vec::new();
                    let mut func = Vec::new();
                    for t in left.iter(){
                        match t{
                            LogicalPlanExprAccumulatorAssist::RealConfValue(t) => {
                                conf.push(t.clone());
                            },
                            LogicalPlanExprAccumulatorAssist::LogicalPlanExprFuncOrValue(_) => {
                                return None;
                            },
                        }
                    }
                    for t in right.iter(){
                        match t{
                            LogicalPlanExprAccumulatorAssist::RealConfValue(t) => {
                                func.push(LogicalPlanExprFuncOrValue::Value(t.clone().up()));
                            },
                            LogicalPlanExprAccumulatorAssist::LogicalPlanExprFuncOrValue(t) => {
                                func.push(t.clone());
                            },
                        }
                    }
                    return Some(LogicalPlanExprAccumulator(n, conf, func));
                }else{
                    return None;
                }
            });

        func_no_arg
            .or(func_only_conf)
            .or(func_only_data)
            .or(func_mixed)
}

pub fn parse_logical_expr_accumulator_comb<'a>()
    ->impl Parser<char, LogicalCombOutput<LogicalPlanExprAccumulator>, TextChar>+'a{

    parse_logical_expr_accumulator_single().logical_comb(
        text_just("(").expand_by_repeated(text_just(" ")),
        text_just(")").expand_by_repeated(text_just(" ")),
        text_just("&&").expand_by_repeated(text_just(" ")),
        text_just("||").expand_by_repeated(text_just(" "))
    )
}

pub fn conf_parse_logical_expr_accumulator_comb(desc: &str)
    ->MapResult<LogicalCombOutput<LogicalPlanExprAccumulator>>{

    info!("conf_parse_logical_expr_func_comb [{}]", desc);

    let res = parse_text(desc,
                         &parse_logical_expr_accumulator_comb(),
                         vec!["no"].as_slice());
    match res {
        ParserResult::OkEnd(t) => {
            return MapResult::Ok(t);
        },
        ParserResult::Ok(_) => {
            warn!("conf_parse_logical_expr_accumulator_comb failed: not to OkEnd");
            return MapResult::Err(MapErr::new(format!("[{}] parse Ok not to OkEnd", desc)));
        },
        ParserResult::Err(e) => {
            warn!("conf_parse_logical_expr_accumulator_comb failed: at offset {}", e.offset);
            return MapResult::Err(MapErr::new(format!("[{}] Err at offset [{}]",
                                                      desc, e.offset)))
        },
        ParserResult::ErrEnd(e) => {
            warn!("conf_parse_logical_expr_accumulator_comb failed: at offset {}", e.offset);
            return MapResult::Err(MapErr::new(format!("[{}] ErrEnd at offset [{}]",
                                                      desc, e.offset)))
        },
    }
}